【译】异步计算:Web服务器+ Dask
让我们想象一个简单的Web服务器,它既可以快速加载页面,也可以在较慢的加载页面上执行一些计算。在我们的例子中,这将是一个简单的Fibonnaci服务应用程序,但您可以想象替换fib函数在一些输入数据上运行机器学习模型,从数据库中获取结果等。
import tornado.ioloop
import tornado.web
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
def get(self, n):
result = fib(int(n))
self.write(str(result))
class FastHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello!")
def make_app():
return tornado.web.Application([
(r"/fast", FastHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(8000)
速度¶
我们知道用户会将响应时间快速与否和网站权威内容和信任联系起来,因此我们希望衡量页面加载的速度。我们重点感兴趣的在于,在模拟我们的web服务器为许多用户提供服务时,在许多同时加载请求期间执行此操作的耗时。
import tornado.httpclient
client = tornado.httpclient.AsyncHTTPClient()
import tornado.gen
from time import time
@tornado.gen.coroutine
def measure(url, n=100):
""" Get url n times concurrently. Print duration. """
start = time()
futures = [client.fetch(url) for i in range(n)]
results = yield futures
end = time()
print(url, ', %d simultaneous requests, ' % n, 'total time: ', (end - start))
耗时¶
我们来看看以下几种执行情况的执行耗时
- Tornado 一次执行 fast 返回耗时 10ms
(实际上在我的机器上这个执行要更快)
- 执行100次 fast 大约在 100ms 左右, 所以这个是比较好的接近并发的效率
(实际上,tornado在没有特殊处理的时候,仍然是一个串型执行的过程,原文给的例子中看不出来,但是从我的执行结果中可以很好的看出来,实际上并没有并发)
- 调用 fib 函数需要的时间
- 调用 fib 100次需要大约100倍的时间,并没有那么多的并行性效率 (递归执行,这个差距更加明显)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=100)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=100)
异步阻塞¶
在下面的示例中,我们看到对路由fib/ 的一次缓慢调用会阻塞其他更快的请求:
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/35', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
讨论¶
这里存在两个问题:
我们所有的fib调用都是独立的,我们希望与多个内核或附近的集群并行运行这些计算。
我们缓慢的计算密集的fib函数的请求可能会妨碍我们的快速请求。一个慢用户可以影响其他所有人。
使用dask 进行异步进程计算¶
要解决这两个问题,我们将使用Dask将计算派发到其他进程或计算机中。因为Dask是一个异步框架,它可以很好地与Tornado或Asyncio集成。
from dask.distributed import Client
dask_client = Client(asynchronous=True) # use local processes for now
def fib(n):
if n < 2:
return n
else:
return fib(n - 1) + fib(n - 2)
class FibHandler(tornado.web.RequestHandler):
async def get(self, n):
future = dask_client.submit(fib, int(n)) # submit work to happen elsewhere
result = await future
self.write(str(result))
class MainHandler(tornado.web.RequestHandler):
async def get(self):
self.write("Hello, world")
def make_app():
return tornado.web.Application([
(r"/fast", MainHandler),
(r"/fib/(\d+)", FibHandler),
])
app = make_app()
app.listen(9000)
# Before parallelism
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/28', n=20)
# After parallelism
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fib/28', n=20)
异步计算¶
在之前当一个请求在忙于计算 fib(...) 陷入阻塞 的时候 Tornado 同样也被阻塞了。此时 Tornado 它无法处理任何其他请求。当我们的服务提供这些昂贵、便宜的计算时,这将会成为严重的问题,开销少的请求会因为这种不必要的请求而挂起。
由于Dask能够与Tornado或Asyncio等异步系统集成,因此我们的Web服务器可以在多个请求之间自由跳转,即使在后台进行计算时也是如此。在下面的例子中,我们看到即使首先开始慢速计算,快速计算也只需几毫秒就会返回。
# Before async
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fib/35', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:8000/fast', n=1)
# After async
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fib/35', n=1)
tornado.ioloop.IOLoop.current().add_callback(measure, 'http://localhost:9000/fast', n=1) # 这个请求不会被阻塞
其他注意点¶
在这些情况下,今天人们倾向于使用 concurrent.futures 或 Celery。
- concurrent.futures 允许在一台机器上轻松实现并行性,并且可以很好地集成到异步框架中。 API正是我们上面展示的(Dask实现了concurrent.futures API)。但是,concurrent.futures不容易扩展到群集。
- Celery更容易扩展到多台机器,但是具有更高的延迟,不能很好地缩小,并且需要一些努力来集成到异步框架中(或者至少这是我的理解,我的经验很浅)
在这种情况下,Dask提供了两者的一些好处。它很容易在常见的单机情况下进行设置和使用,但也可以扩展到集群。它与异步框架很好地集成,只增加了非常小的延迟。
async def f():
start = time()
result = await dask_client.submit(lambda x: x + 1, 10)
end = time()
print('Roundtrip latency: %.2f ms' % ((end - start) * 1000))
tornado.ioloop.IOLoop.current().add_callback(f)
Comments !